How do you use Feature Store in the MLOps process on Vertex AI?

Ivan Nardini
Google Cloud - Community
12 min readDec 1, 2022

--

Figure 1 — Vertex AI Q&A series logo

One of the questions @ MLOps on Google Cloud CloudAir webinar was:

How do you use Feature Store in the MLOps process on Vertex AI?

Because it is not the first time I receive this question, I take it as an opportunity to write the 1st article of the Vertex AI Q&A series and talk about the role of the feature store along the MLOps process on Vertex AI. According to this whitepaper, these are the steps in the MLOps process that may involve a Feature store:

  • Model experimentation, both feature engineering and model training
  • Continuous training
  • Online prediction

In this article, I will discuss in detail how a feature store contributes in each of those steps for predicting fraudulent transactions in a banking scenario. And I will use Vertex AI Feature Store, the fully managed feature store service on Google Cloud. With it, I will be able to provide code samples and leverage its integration with Vertex AI services to illustrate some concepts and demonstrate how a feature store can support and improve your MLOps process on Google Cloud.

Model Experimentation and Feature store: store, monitor and consume your features

Feature Engineering and Feature store

Typically, before developing a ML model, you build features.

At scale, you may have the same features across several teams for many different ML applications.

For example, in a banking company, you may have the customer_nb_tx_last_month feature about the number of transactions per customer over the last month. That feature may be used to build both in a the fraud detection ML model from the FraudFinder team and in a loan eligibility ML model from the LoanAnalyzer team

Also it may happen that different teams build different features to express the same property or characteristic.

In our the banking company, you may have different data scientists calculate the terminal_risk_index_last_month feature which is about a risk index of the terminal to be exposed to fraudulent transactions using different formulas.

To avoid having the same features computed multiple times across different teams and guarantee feature consistency, you may decide to introduce a feature store. A feature store is a repository to store, organize and share those features. This repository ingests features from data pipelines (batch and streaming) that calculates features each time raw data is available. And it makes your features discoverable and easily reusable from different teams. Once ingested, a feature store allows you to search and filter for features and, more importantly, a feature store allows you to track features by monitoring their distributions and detect anomalies over time.

In fact, as soon as you have new data, you can compute new features. But because data may significantly change over time (drift), you need to monitor feature distributions and identify potential feature distribution changes that would affect downstream model performance that consumes those features.

In a fraud detection scenario, imagine if the bank collects new customers in a district due to a new marketing campaign that causes a change in the district feature distribution. Now, let’s assume that you trained an ML model that was not performing well in that district because of limited numbers of examples. But the overall performance was good enough to convince your team to put it in production. In that scenario, when new fraudulent transactions happen in that district, it would take some time for you and your team clarifying why the overall model performance significantly drops without a feature store that allows you to quickly monitor feature distributions such as district in this case.

Below you will find how the feature store provides storage, searching and monitoring capabilities across several teams.

Figure 2 — Model Development and Feature store

Depending on your organization, you may have the following roles working with Feature store:

  • Platform administrator who take responsibility of managing feature store and its operations such as creating its data model, defining the feature access or its monitoring policies
  • Data engineers responsible for creating and publishing features with data pipelines into Feature Store
  • Data analysts, data scientists, and machine learning engineers who define and consume features from Feature Store for training and serving ML models

If you are a data scientist, you can easily extract your features from a Pandas DataFrame in Vertex AI Feature Store on Google Cloud. In fact, once a new feature store resource is available, you use the following code from Vertex AI Python SDK:

from typing import List
from pandas import DataFrame
from google.cloud import aiplatform


def ingest_features_df(
project: str,
location: str,
featurestore_id: str,
entity_type_id: str,
features_ids: List[str],
feature_time: str,
features_df: DataFrame,
entity_id_field: str
) -> aiplatform.featurestore.EntityType:
"""
Ingests features into a Featurestore from a Pandas DataFrame.
Args:
project: The Google Cloud project ID.
location: The Google Cloud location.
featurestore_id: The Featurestore ID.
entity_type_id: The Entity Type ID.
features_ids: The list of Feature IDs.
feature_time: The Feature timestamp.
features_df: The Pandas DataFrame containing the features.
entity_id_field: The Entity ID field.
Returns:
None
"""
# Initialize the Vertex SDK for Python
aiplatform.init(project=project, location=location)

# Get the entity type from an existing Featurestore
entity_type = aiplatform.featurestore.EntityType(entity_type_id=entity_type_id,
featurestore_id=featurestore_id)
# Ingest the features
entity_type.ingest_from_df(
feature_ids=features_ids,
feature_time=feature_time,
df_source=features_df,
entity_id_field=entity_id_field
)

return entity_type

where

  • feature_ids are unique identifiers of the features you want to import values
  • feature_time is the column that holds feature timestamp per each entity
  • features_df is the Pandas DataFrame containing your features
  • entity_id_field is the column that holds unique identifier of a related features group

Below you can see how features look like in the Vertex AI Feature Store in our fraud detection scenario.

Figure 3 — Feature store and Features view filtered by entity type

Notice that, if feature value monitoring is enabled, Vertex AI Features will also allow tracking how much a feature’s value distribution changes over time as described in the documentation.

Notice that, if feature value monitoring is enabled, Vertex AI Features will also allow tracking how much a feature’s value distribution changes over time as described in the documentation.

Now you know that a feature store makes features reusable and it helps to monitor them. But you may want to ask:

“You mentioned using a feature store when I need to scale features to many use cases. However, what if it is not the case? Can a feature store still be useful to train ML models?”

Yes, it would. Indeed, another challenge that the feature store solves is represented by data leakage or target leakage which may affect your model.

Model Training and Feature store

In real-time scenarios you constantly get fresh data to reconstruct features but it would require some time before you collect labels you want to predict.

Back to the fraud detection scenario, assume you have the number of hourly transactions per terminal, terminal_nb_tx_last_month, as a feature. You may collect new transactions each hour and consequently calculate new feature values 24 times per day. But after the model you built predicts that a transaction is fraudulent, an investigation would start and it will require time to confirm if that transaction is actually a fraud or not.

In those scenarios, the training dataset you use to train your model is valid only if it includes features computed on data available before and not after the label is observed. Any features computed on data available after the target has been collected wouldn’t be a correct representation of feature values to predict your target variable. That’s because you would build your model using feature values that are available only beyond the time you collect the label. In other words, you would use “the future to predict the present”. As a consequence, it may happen that your model would perform well during training time but it will register poor performance when it goes to production generating a training serving skew.

Figure 4 — Data leakage in a picture

In our fraud detection scenario with the terminal_nb_tx_last_month feature, if we assume that it is highly positively correlated with our is_fraud target, in production the model will tend to predict more fraudulent transactions when they are not just because we associate wrong feature values to our labels. That’s how in production data leakage would generate bad model precision.

With a Feature store, you can prevent training and serving skew. A feature store allows you to get the most up-to-date view of features computed over time before labels become available by using point-in-time query. A point-in-time query retrieves feature values for each record up to the time you collect the label by preventing leakage of future feature values. Consequently, the resulting training dataset will contain the representative feature values of your labels up to the time it has been observed.

Below you will find how the feature store allows you to generate training dataset using point in time query.

Figure 5 — Model Training and Feature store

Making a point-in-time query with Vertex AI Feature Store is quite simple. When you need to train your model, you can use the Feature Store SDK to fetch features and materialize them in a dataframe. Here is the sample code.


from typing import Dict, List
from pandas import DataFrame
from google.cloud import aiplatform


def batch_serve_features_df(
project: str,
location: str,
featurestore_id: str,
serving_feature_ids: Dict[str, List[str]],
read_instances_df: DataFrame,
pass_through_fields: List[str]) -> DataFrame:
"""
Retrieves batch feature values from a Featurestore and writes them to a GCS bucket.
Args:
project: The Google Cloud project ID.
location: The Google Cloud location.
featurestore_id: The Featurestore ID.
serving_feature_ids: The dictionary of Entity Type IDs and Feature IDs to retrieve.
read_instances_df: The Pandas DataFrame containing entities and feature values.
pass_through_fields: The list of fields to pass through extra to the label column.
Returns:
The Pandas DataFrame containing the dataset.
"""

# Initialize the Vertex SDK for Python
aiplatform.init(project=project, location=location)

# Get an existing Featurestore
featurestore = aiplatform.featurestore.Featurestore(featurestore_name=featurestore_id)

# Get data with a point-in-time query from the Featurestore
df = featurestore.batch_serve_to_df(
serving_feature_ids=serving_feature_ids,
read_instances_df=read_instances_df,
pass_through_fields=pass_through_fields
)

return df

where

  • serving_feature_ids are the features to fetch from the feature store,
  • read_instances_df is the Pandas Dataframe with the target variable, serving timestamp and entities you want to use to read.
  • pass_through_fields are any other variable you want to read from the instance dataframe.

With this method, you can perform efficient point-in-time lookups and join all required feature values from the feature store to the label (and other variables) to create the correct dataset to train your model.

Continuous training and Feature Store: Feed your training pipelines

Training ML models is a repetitive task. To automate that task and continuously train your models, you build ML pipelines. A ML pipeline allows you to orchestrate and automate your model training in combination with some triggers.

Each time a ML pipeline is triggered, it needs to ingest new training data. In scenarios when you have a feature store, you would create your training dataset by running a point-in-time query which will retrieve features based on the time you collect your label.

Depending on how you will automate your training, one possibility is to create an ingestion component in your ML pipeline which will fetch features from the feature store and consume them to train your ML model. Below you can see how you can trigger a new ML pipeline which consumes a training dataset created by a point-in-time query in the feature store.

Figure 6 — Continuous training and Feature store

On Vertex AI, you use Vertex AI Pipelines to run ML pipelines in a serverless way using Kubeflow Pipelines (KFP) or Tensorflow Extended (TFX) DSL. Those frameworks allow you to build an ingestion component to implement the point-in-time logic. Below you can see the pseudo-code of the ingestion pipeline component you can build using KFP in our fraud detection scenario.

from kfp.v2.dsl import component
from typing import NamedTuple


@component(output_component_file="batch_serve_features_gcs.yaml",
base_image="python:3.9",
packages_to_install=["google-cloud-aiplatform"])
def batch_serve_features_gcs(feature_store_id: str,
gcs_destination_output_uri_prefix: str,
gcs_destination_type: str,
serving_feature_ids: str,
read_instances_uri: str,
project: str,
location: str) -> NamedTuple("Outputs", [("gcs_destination_output_uri_paths", str)]):
# Import libraries
import os
from json import loads
from google.cloud import aiplatform
from google.cloud.aiplatform.featurestore import Featurestore

# Initialize Vertex AI client
aiplatform.init(project=project, location=location)

# Initiate feature store and run batch serve request
featurestore = Featurestore(featurestore_name=feature_store_id)

# Serve features in batch on GCS
serving_feature_ids = loads(serving_feature_ids)
featurestore.batch_serve_to_gcs(
gcs_destination_output_uri_prefix=gcs_destination_output_uri_prefix,
gcs_destination_type=gcs_destination_type,
serving_feature_ids=serving_feature_ids,
read_instances_uri=read_instances_uri
)

# Store metadata
gcs_destination_output_path_prefix = gcs_destination_output_uri_prefix.replace("gcs://", "/gcs/")
gcs_destination_output_paths = os.path.join(gcs_destination_output_path_prefix, "*.csv")
component_outputs = NamedTuple("Outputs",
[("gcs_destination_output_uri_paths", str), ], )

return component_outputs(gcs_destination_output_paths)

The component leverages the Vertex AI Python SDK to batch serve feature values from Vertex AI Feature Store to get data. As input it takes the BigQuery URI of the table containing new labels and as output it returns the list of URIs you can use to create a Vertex AI Dataset resource. In this way you store all the metadata associated with your new training data in the Vertex AI ML Metadata for reproducibility.

Online prediction and Feature Store: serve your features

Let’s assume that you have a ML pipeline which ingests features from a feature store and trains your model. And now that you have your model you want to deploy it to generate real-time predictions. As you can imagine, each time a new prediction request comes, you need to pass feature values the model expects to receive as inputs to generate real time predictions.

There are some scenarios where rather than creating real time feature pipelines, you might want to calculate, store and make features available to incoming prediction requests with low latency. For example, the FraudFinder team would consider computing the customer_nb_tx_last_day feature (number of transactions over the last entire day) in a data warehouse rather than in a streaming data pipeline. In those scenarios where you compute features on server side, data warehouses such as BigQuery aren’t optimized to serve them in real time at scale. That’s why you need a different service that allows feature lookups and returns a single row with many columns per each data entity such as customer or terminal in the fraud detection use case.

Finally, even if you can calculate features when you deploy a model, you need to be sure to reproduce the same preprocessing steps you had when you trained your model. If you aren’t able to do that, again a skew between training and serving data could happen that might severely impact your model performance. To mitigate this bias, you can eliminate the preprocessing steps online and serve the same aggregated features you already have during training to generate an online prediction.

Both of these scenarios provide valuable reasons to introduce a Feature Store, in which you have a service that helps you serve the same aggregated features available at training time, at scale with low latency. And because it serves the same features you consume to train your ML models, it helps to mitigate possible training-serving skew.

In the following picture, you can see how a feature store allows users to serve features for generating online predictions.

Figure 7 — Prediction serving and Feature Store

By using Vertex AI Python SDK, you can easily integrate the serving of your features within your serving service. Following is a simple sample of how you apply the read method to retrieve features from Vertex AI Feature Store.

from typing import List
from pandas import DataFrame
from google.cloud import aiplatform


def online_serve_feature_values(
project: str,
location: str,
featurestore_id: str,
entity_type_id: str,
entity_ids: List[str],
feature_ids: List[str]) -> DataFrame:
"""
Retrieves online feature values from a Featurestore.
Args:
project: The Google Cloud project ID.
location: The Google Cloud location.
featurestore_id: The Featurestore ID.
entity_type_id: The Entity Type ID.
entity_ids: The list of Entity IDs.
feature_ids: The list of Feature IDs.
Returns:
A Pandas DataFrame containing the feature values.
"""

# Initialize the Vertex SDK for Python
aiplatform.init(project=project, location=location)

# Get the entity type from an existing Featurestore
entity_type = aiplatform.featurestore.EntityType(entity_type_id=entity_type_id,
featurestore_id=featurestore_id)
# Retrieve the feature values
feature_values = entity_type.read(entity_ids=entity_ids, feature_ids=feature_ids)

return feature_values

where

  • entity_ids is the list of the identifiers of related features group
  • features_ids is the list of the feature identifiers you want to retrieve

The method returns the latest value of each feature, which means the feature values with the most recent timestamp are returned by default.

Summary

One of the questions @ MLOps on Google Cloud CloudAir webinar was:

How do you use Feature Store in the MLOps process on Vertex AI?

Based on what I learned and experienced, I would say following are the steps in the MLOps process that may involve a Feature store:

Figure 8 — How do you use the Feature Store in the MLOps process on Vertex AI?

It is important to say that not all the ML scenarios require a feature store. I would suggest referring to this article from Lak Lakshmanan to evaluate if you really need a feature store.

What’s next

I hope you got a better understanding about the role of the feature store along the MLOps process on Google Cloud. If so, you may want to get started with it on Vertex AI. Check out the documentation and this repository to learn more. In case you have further questions about the feature store and any other services of Vertex AI, feel free to leave a comment or reach out on Linkedin or Twitter.

Thanks to Kaz Sato, Rajesh Thallam, Gabriele Randelli and all colleagues for feedback and suggestions.

--

--

Ivan Nardini
Google Cloud - Community

Customer Engineer at @GoogleCloud who is passionate with Machine Learning Engineering. The Lead of MLOps.community’s Engineering Lab.